package cn.doitedu.rtmk.demo.demo5;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
public class Demo5 {

    public static void main(String[] args) throws Exception {
        // 构建编程环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(10000);
        env.setParallelism(1);
        env.getCheckpointConfig().setCheckpointStorage("file:/d:/ckpt");

        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);


        /**
         * 一、读 kafka中的日志明细数据,接收用户的行为
         */
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("doitedu:9092")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setGroupId("gpac01" + System.currentTimeMillis())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setTopics("rtmk-test-data")
                .build();
        DataStreamSource<String> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "s");
        SingleOutputStreamOperator<EventBean> eventsStream = ds.map(json -> JSON.parseObject(json, EventBean.class));

        /**
         * 二、用cdc连接器，监听mysql中的 doit35.rule_res  规则资源表
         */
        // 创建mysql中的源表的映射逻辑表(用cdc连接器)
        tenv.executeSql("CREATE TABLE rule_res_cdc (   " +
                "      rule_id STRING,                                      " +
                "      rule_model_id STRING,                           " +
                "      rule_param_json STRING,                           " +
                "      rule_param_groovy_code STRING,                          " +
                "      rule_model_groovy_code STRING,                          " +
                "      status INT,                            " +
                "      creator STRING,                          " +
                "      create_time TIMESTAMP(3),                    " +
                "      update_time TIMESTAMP(3),                    " +
                "     PRIMARY KEY (rule_id) NOT ENFORCED            " +
                "     ) WITH (                                 " +
                "     'connector' = 'mysql-cdc',               " +
                "     'hostname' = 'doitedu'   ,               " +
                "     'port' = '3306'          ,               " +
                "     'username' = 'root'      ,               " +
                "     'password' = 'root'      ,               " +
                "     'database-name' = 'doit35',          " +
                "     'table-name' = 'rule_res'               " +
                ")");

        DataStream<Row> rule_res_cdc = tenv.toChangelogStream(tenv.from("rule_res_cdc"));
        SingleOutputStreamOperator<RuleResourceBean> ruleResourceBeans = rule_res_cdc.map(new MapFunction<Row, RuleResourceBean>() {
            @Override
            public RuleResourceBean map(Row row) throws Exception {
                String rule_id = row.getFieldAs("rule_id");
                String rule_model_id = row.getFieldAs("rule_model_id");
                String rule_param_json = row.getFieldAs("rule_param_json");
                String rule_param_groovy_code = row.getFieldAs("rule_param_groovy_code");
                String rule_model_groovy_code = row.getFieldAs("rule_model_groovy_code");
                int status = row.getFieldAs("status");
                int operateType = row.getKind().toByteValue();

                return new RuleResourceBean(operateType, rule_id, rule_model_id, rule_param_json, rule_param_groovy_code,rule_model_groovy_code, status, null, null, null);
            }
        });

        // 广播状态的结构==>  key:String规则id ，value: 规则资源封装对象
        BroadcastStream<RuleResourceBean> ruleResourceStream = ruleResourceBeans.broadcast(StateDescriptors.RULE_RES_STATE_DESC);

        SingleOutputStreamOperator<RtmkMessage> res =
                eventsStream.keyBy(EventBean::getUserId)
                        .connect(ruleResourceStream)
                        .process(new CoreEngineFunction());

        res.print();
        env.execute();

    }


}
